In [1]:
%run startup.py

In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 8: Hot and Cold Observables

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want an Observable that does not start emitting items to subscribers until asked publish, publish_value, multicast, let/let_bind

This is basically multicast.


In [3]:
rst(O.publish)
    
def emit(obs):
    log('.........EMITTING........')
    sleep(0.1)
    obs.on_next(rand())
    obs.on_completed()
    
rst(title='Reminder: 2 subscribers on a cold stream:')    
s = O.create(emit)
d = subs(s), subs(s.delay(100))


rst(title='Now 2 subscribers on a PUBLISHED (hot) stream', sleep=0.4)    
sp = s.publish()
subs(sp, name='subs1')
subs(sp.delay(100), name='subs2')
log('now connect')
# this creates a 'single, intermediate subscription between stream and subs' 
d = sp.connect()

# will only see the finish, since subscribed too late
d = subs(sp, name='subs3')



========== publish ==========

module rx.linq.observable.publish
@extensionmethod(ObservableBase)
def publish(self, mapper=None):
    Returns an observable sequence that is the result of invoking the
    mapper on a connectable observable sequence that shares a single
    subscription to the underlying sequence. This operator is a
    specialization of Multicast using a regular Subject.

    Example:
    res = source.publish()
    res = source.publish(lambda x: x)

    mapper -- {Function} [Optional] Selector function which can use the
        multicasted source sequence as many times as needed, without causing
        multiple subscriptions to the source sequence. Subscribers to the
        given source will receive all notifications of the source from the
        time of the subscription on.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------


========== Reminder: 2 subscribers on a cold stream: ==========


   0.6     M New subscription (81636) on stream 276597469
   1.6     M .........EMITTING........
 107.1     M [next]  106.4: 78 -> 81636  
 107.4     M [cmpl]  106.7: fin -> 81636  

 108.3     M New subscription (94168) on stream 276597493
 108.9     M .........EMITTING........
 214.3     M main thread sleeping 0.4s
 314.0    T4 [next]  205.7: 93 -> 94168  
 314.2    T4 [cmpl]  205.9: fin -> 94168  


========== Now 2 subscribers on a PUBLISHED (hot) stream ==========


   0.7     M New subscription (subs1) on stream 276598873

   1.7     M New subscription (subs2) on stream 276598929
   2.6     M now connect
   3.2     M .........EMITTING........
 105.0     M [next]  104.3: 66 -> subs1  
 106.2     M [cmpl]  105.3: fin -> subs1  

 107.8     M New subscription (subs3) on stream 276598873
 108.2     M [cmpl]    0.3: fin -> subs3  

In [4]:
rst(O.publish_value)

def sideeffect(*x):
    log('sideffect', x)

print('Everybody gets the initial value and the events, sideeffect only once per ev')
src = O.interval(500).take(20).do_action(sideeffect)
published = src.publish_value(42)
subs(published), subs(published.delay(100))
d = published.connect()
sleep(1.3)
log('disposing now')
d.dispose()



========== publish_value ==========

module rx.linq.observable.publishvalue
@extensionmethod(ObservableBase)
def publish_value(self, initial_value, mapper=None):
    Returns an observable sequence that is the result of invoking the
    mapper on a connectable observable sequence that shares a single
    subscription to the underlying sequence and starts with initial_value.

    This operator is a specialization of Multicast using a BehaviorSubject.

    Example:
    res = source.publish_value(42)
    res = source.publish_value(42, lambda x: x.map(lambda y: y * y))

    Keyword arguments:
    initial_value -- {Mixed} Initial value received by observers upon
        subscription.
    mapper -- {Function} [Optional] Optional mapper function which can
        use the multicasted source sequence as many times as needed, without
        causing multiple subscriptions to the source sequence. Subscribers
        to the given source will receive immediately receive the initial
        value, followed by all notifications of the source from the time of
        the subscription on.

    Returns {Observable} An observable sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------
Everybody gets the initial value and the events, sideeffect only once per ev

   4.2     M New subscription (81753) on stream 276592833
   4.4     M [next]    0.2: 42 -> 81753  

   5.0     M New subscription (94051) on stream 276592873
  92.0    T5 [next]  209.7: 66 -> subs2  
  92.5    T5 [cmpl]  210.1: fin -> subs2  
 109.9    T6 [next]  104.7: 42 -> 94051  
 508.5    T7 sideffect (0,)
 509.1    T7 [next]  504.9: 0 -> 81753  
 611.6    T8 [next]  606.4: 0 -> 94051  
1012.0    T9 sideffect (1,)
1012.6    T9 [next] 1008.4: 1 -> 81753  
1117.0   T10 [next] 1111.8: 1 -> 94051  
1312.3     M disposing now

... and then only emits the last item in its sequence publish_last


In [36]:
# not yet in RXPy

... via multicast

RxPY also has a multicast operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence.

Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.
Following the RXJS example at reactive.io docu:


In [34]:
rst(O.multicast)
# show actions on intermediate subject:
show = False

def emit(obs):
    'instead of range we allow some logging:'
    for i in (1, 2):
        v = rand()
        log('emitting', v)
        obs.on_next(v)
    log('complete')
    obs.on_completed()
    

class MySubject:
    def __init__(self):
        self.rx_subj = Subject()
        if show:
            log('New Subject %s created' % self)

        
    def __str__(self):
        return str(hash(self))[-4:]
    
    def __getattr__(self, a):
        'called at any attr. access, logging it'
        if not a.startswith('__') and show:
            log('RX called', a, 'on MySub\n')
        return getattr(self.rx_subj, a)
        
        
subject1 = MySubject()
subject2 = MySubject()

source = O.create(emit).multicast(subject2)

# a "subscription" *is* a disposable
# (the normal d we return all the time):
d, observer  = subs(source, return_subscriber=True)
ds1 = subject1.subscribe(observer)
ds2 = subject2.subscribe(observer)
print ('we have now 3 subscriptions, only two will see values.')
print('start multicast stream (calling connect):')
connected = source.connect()
d.dispose()



========== multicast ==========

module rx.linq.observable.multicast
@extensionmethod(ObservableBase)
def multicast(self, subject=None, subject_factory=None, mapper=None):
    Multicasts the source sequence notifications through an instantiated
    subject into all uses of the sequence within a mapper function. Each
    subscription to the resulting sequence causes a separate multicast
    invocation, exposing the sequence resulting from the mapper function's
    invocation. For specializations with fixed subject types, see Publish,
    PublishLast, and Replay.

    Example:
    1 - res = source.multicast(observable)
    2 - res = source.multicast(subject_factory=lambda: Subject(),
                               mapper=lambda x: x)

    Keyword arguments:
    subject_factory -- {Function} Factory function to create an
        intermediate subject through which the source sequence's elements
        will be multicast to the mapper function.
    subject -- Subject {Subject} to push source elements into.
    mapper -- {Function} [Optional] Optional mapper function which can
        use the multicasted source sequence subject to the policies enforced
        by the created subject. Specified only if subject_factory" is a
        factory function.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------

   2.6     M New subscription (17367) on stream 276597497
we have now 3 subscriptions, only two will see values.
start multicast stream (calling connect):
   5.1     M emitting 10
   5.2     M [next]    2.3: 10 -> 17367  
   5.4     M [next]    2.4: 10 -> 17367  
   6.0     M emitting 67
   6.5     M [next]    3.6: 67 -> 17367  
   6.8     M [next]    3.9: 67 -> 17367  
   6.9     M complete
   7.0     M [cmpl]    4.1: fin -> 17367  
   7.1     M [cmpl]    4.2: fin -> 17367  

In [58]:
rst(O.let)
# show actions on intermediate subject:
show = True

def emit(obs):
    'instead of range we allow some logging:'
    v = rand()
    log('emitting', v)
    obs.on_next(v)
    log('complete')
    obs.on_completed()
    
source = O.create(emit)

# following the RXJS example:
header("without let")
d = subs(source.concat(source))
d = subs(source.concat(source))

header("now with let")
d = subs(source.let(lambda o: o.concat(o)))
d = subs(source.let(lambda o: o.concat(o)))
# TODO: Not understood:
# "This operator allows for a fluent style of writing queries that use the same sequence multiple times."
# ... I can't verify this, the source sequence is not duplicated but called every time like a cold obs.



========== let_bind ==========

module rx.linq.observable.let
@extensionmethod(Observable, alias="let")
def let_bind(self, func):
    Returns an observable sequence that is the result of invoking the
    mapper on the source sequence, without sharing subscriptions. This
    operator allows for a fluent style of writing queries that use the same
    sequence multiple times.

    mapper -- {Function} Selector function which can use the source
        sequence as many times as needed, without sharing subscriptions to
        the source sequence.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------


========== without let ==========


   1.4     M New subscription (46742) on stream 276639557
   1.8     M emitting 99
   2.0     M [next]    0.4: 99 -> 46742  
   2.2     M complete
   2.4     M emitting 47
   2.6     M [next]    1.1: 47 -> 46742  
   2.7     M complete
   2.9     M [cmpl]    1.3: fin -> 46742  

   3.5     M New subscription (16738) on stream 276625117
   3.8     M emitting 62
   3.9     M [next]    0.3: 62 -> 16738  
   4.0     M complete
   4.3     M emitting 100
   4.4     M [next]    0.8: 100 -> 16738  
   4.5     M complete
   4.6     M [cmpl]    1.0: fin -> 16738  


========== now with let ==========


   5.3     M New subscription (59075) on stream 276641713
   5.6     M emitting 27
   5.8     M [next]    0.4: 27 -> 59075  
   5.8     M complete
   5.9     M emitting 86
   6.1     M [next]    0.7: 86 -> 59075  
   6.3     M complete
   6.4     M [cmpl]    1.1: fin -> 59075  

   7.4     M New subscription (59039) on stream 276625113
   7.7     M emitting 26
   7.9     M [next]    0.5: 26 -> 59039  
   8.2     M complete
   8.6     M emitting 39
   8.8     M [next]    1.3: 39 -> 59039  
   9.0     M complete
   9.3     M [cmpl]    1.8: fin -> 59039  

... and then emits the complete sequence, even to those who subscribe after the sequence has begun replay

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.


In [39]:
rst(O.replay)

def emit(obs):
    'continuous emission'
    for i in range(0, 5):
        v = 'nr %s, value %s' % (i, rand())
        log('emitting', v, '\n')
        obs.on_next(v)
        sleep(0.2)    
    

def sideeffect(*v):
    log("sync sideeffect (0.2s)", v, '\n')
    sleep(0.2)
    log("end sideeffect", v, '\n')
    

def modified_stream(o):
    log('modified_stream (take 2)')
    return o.map(lambda x: 'MODIFIED FOR REPLAY: %s' % x).take(2)

header("playing and replaying...")
subject = Subject()
cold = O.create(emit).take(3).do_action(sideeffect)

assert not getattr(cold, 'connect', None)
hot = cold.multicast(subject)
connect = hot.connect # present now.

#d, observer = subs(hot, return_subscriber=True, name='normal subscriber\n')
#d1 = subject.subscribe(observer)

published = hot.replay(modified_stream, 1000, 50000)
d2 = subs(published, name='Replay Subs 1\n')


#header("replaying again")
#d = subs(published, name='Replay Subs 2\n')
log('calling connect now...')
d3 = hot.connect()



========== replay ==========

module rx.linq.observable.replay
@extensionmethod(ObservableBase)
def replay(self, mapper, buffer_size=None, window=None, scheduler=None):
    Returns an observable sequence that is the result of invoking the
    mapper on a connectable observable sequence that shares a single
    subscription to the underlying sequence replaying notifications subject
    to a maximum time length for the replay buffer.

    This operator is a specialization of Multicast using a ReplaySubject.

    Example:
    res = source.replay(buffer_size=3)
    res = source.replay(buffer_size=3, window=500)
    res = source.replay(None, 3, 500, scheduler)
    res = source.replay(lambda x: x.take(6).repeat(), 3, 500, scheduler)

    Keyword arguments:
    mapper -- [Optional] Selector function which can use the multicasted
        source sequence as many times as needed, without causing multiple
        subscriptions to the source sequence. Subscribers to the given
        source will receive all the notifications of the source subject to
        the specified replay buffer trimming policy.
    buffer_size -- [Optional] Maximum element count of the replay buffer.
    window -- [Optional] Maximum time length of the replay buffer.
    scheduler -- [Optional] Scheduler where connected observers within the
        mapper function will be invoked on.

    Returns {Observable} An observable sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------


========== playing and replaying... ==========


   2.8     M New subscription (Replay Subs 1) on stream 276680345
   3.6     M modified_stream (take 2)
   4.4     M calling connect now...
   5.3     M emitting nr 0, value 229 

   5.5     M sync sideeffect (0.2s) ('nr 0, value 229',) 

 207.6     M end sideeffect ('nr 0, value 229',) 

 409.5     M emitting nr 1, value 981 

 409.9     M sync sideeffect (0.2s) ('nr 1, value 981',) 

 612.9     M end sideeffect ('nr 1, value 981',) 

 818.6     M emitting nr 2, value 773 

 819.5     M sync sideeffect (0.2s) ('nr 2, value 773',) 

1025.2     M end sideeffect ('nr 2, value 773',) 

1228.8     M emitting nr 3, value 711 

1429.7     M emitting nr 4, value 427 

1635.6     M [next] 1632.7: MODIFIED FOR REPLAY: nr 0, value 229 -> Replay Subs 1

1635.9     M [next] 1633.1: MODIFIED FOR REPLAY: nr 1, value 981 -> Replay Subs 1

1636.3     M [cmpl] 1633.5: fin -> Replay Subs 1

If you apply the Replay operator to an Observable

  • before you convert it into a connectable Observable,
  • the resulting connectable Observable will always emit the same complete sequence to any future observers,
  • even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers(!)

In [23]:
def mark(x):
    return 'marked %x' % x
def side_effect(x):
    log('sideeffect %s\n' % x)
    
    
for i in 1, 2:
    s = O.interval(100).take(3).do_action(side_effect)
    if i == 2:
        sleep(1)
        header("now with publish - no more sideeffects in the replays")
        s = s.publish()
        
    reset_start_time()
    published = s.replay(lambda o: o.map(mark).take(3).repeat(2), 3)
    
    d = subs(s,         name='Normal\n')
    d = subs(published, name='Replayer A\n')
    d = subs(published, name='Replayer B\n')
    if i == 2:
        d = s.connect()



========== True ==========


   0.5     M New subscription (Normal) on stream 276498261

   1.8     M New subscription (Replayer A) on stream 276487297

   4.3     M New subscription (Replayer B) on stream 276487297
 105.8  T160 sideeffect 0

 106.5  T162 sideeffect 0
 106.1  T161 sideeffect 0
 106.8  T160 [next]  106.2: 0 -> Normal



 108.4  T162 [next]  104.1: marked 0 -> Replayer B
 108.1  T161 [next]  105.7: marked 0 -> Replayer A


 214.5  T164 sideeffect 1
 214.8  T165 sideeffect 1
 215.2  T163 sideeffect 1



 215.9  T164 [next]  215.3: 1 -> Normal
 215.9  T165 [next]  213.5: marked 1 -> Replayer A
 216.7  T163 [next]  212.4: marked 1 -> Replayer B



 318.1  T166 sideeffect 2

 318.7  T166 [next]  316.3: marked 2 -> Replayer A

 320.0  T166 [next]  317.7: marked 0 -> Replayer A

 320.7  T166 [next]  318.4: marked 1 -> Replayer A

 321.4  T166 [next]  319.0: marked 2 -> Replayer A

 321.9  T166 [cmpl]  319.5: fin -> Replayer A

 323.3  T167 sideeffect 2
 323.6  T168 sideeffect 2


 325.0  T168 [next]  320.7: marked 2 -> Replayer B
 323.6  T167 [next]  323.0: 2 -> Normal


 326.4  T168 [next]  322.1: marked 0 -> Replayer B
 324.4  T167 [cmpl]  323.8: fin -> Normal


 327.0  T168 [next]  322.7: marked 1 -> Replayer B

 327.4  T168 [next]  323.1: marked 2 -> Replayer B

 327.5  T168 [cmpl]  323.2: fin -> Replayer B



========== now with publish - no more sideeffects in the replays ==========



========== True ==========


   0.2     M New subscription (Normal) on stream 276784605

   0.7     M New subscription (Replayer A) on stream 276784601

   2.9     M New subscription (Replayer B) on stream 276784601
 109.3  T172 sideeffect 0

 109.9  T172 [next]  109.6: 0 -> Normal

 110.6  T172 [next]  109.7: marked 0 -> Replayer A

 111.5  T172 [next]  108.5: marked 0 -> Replayer B

 214.4  T173 sideeffect 1

 214.7  T173 [next]  214.4: 1 -> Normal

 215.1  T173 [next]  214.1: marked 1 -> Replayer A

 215.4  T173 [next]  212.4: marked 1 -> Replayer B

 321.3  T174 sideeffect 2

 321.6  T174 [next]  321.3: 2 -> Normal

 322.0  T174 [next]  321.0: marked 2 -> Replayer A

 322.9  T174 [next]  321.9: marked 0 -> Replayer A

 323.3  T174 [next]  322.3: marked 1 -> Replayer A

 323.8  T174 [next]  322.9: marked 2 -> Replayer A

 323.9  T174 [cmpl]  322.9: fin -> Replayer A

 325.0  T174 [next]  322.0: marked 2 -> Replayer B

 326.5  T174 [next]  323.5: marked 0 -> Replayer B

 327.0  T174 [next]  324.0: marked 1 -> Replayer B

 327.1  T174 [next]  324.1: marked 2 -> Replayer B

 327.2  T174 [cmpl]  324.2: fin -> Replayer B

 327.9  T174 [cmpl]  327.7: fin -> Normal


In [ ]:

... but I want it to go away once all of its subscribers unsubscribe ref_count, share

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.

The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.


In [33]:
rst(O.interval(1).publish)
publ = O.interval(1000).take(2).publish().ref_count()
# be aware about potential race conditions here
subs(publ)
subs(publ)



========== publish ==========

module rx.linq.observable.publish
@extensionmethod(ObservableBase)
def publish(self, mapper=None):
    Returns an observable sequence that is the result of invoking the
    mapper on a connectable observable sequence that shares a single
    subscription to the underlying sequence. This operator is a
    specialization of Multicast using a regular Subject.

    Example:
    res = source.publish()
    res = source.publish(lambda x: x)

    mapper -- {Function} [Optional] Selector function which can use the
        multicasted source sequence as many times as needed, without causing
        multiple subscriptions to the source sequence. Subscribers to the
        given source will receive all notifications of the source from the
        time of the subscription on.

    Returns an observable {Observable} sequence that contains the elements
    of a sequence produced by multicasting the source sequence within a
    mapper function.
--------------------------------------------------------------------------------

   2.2     M New subscription (00656) on stream 276501981

   3.8     M New subscription (75193) on stream 276501981
Out[33]:
<rx.disposable.Disposable.Disposable at 0x107b15390>
1006.1  T182 [next] 1003.8: 0 -> 00656
1006.6  T182 [next] 1002.8: 0 -> 75193
2009.3  T183 [next] 2007.0: 1 -> 00656
2010.0  T183 [next] 2006.1: 1 -> 75193
2010.4  T183 [cmpl] 2008.1: fin -> 00656
2010.6  T183 [cmpl] 2006.8: fin -> 75193

In [41]:
rst(O.interval(1).share)
def sideffect(v):
    log('sideeffect %s\n' % v)
publ = O.interval(200).take(2).do_action(sideeffect).share()

'''
When the number of observers subscribed to published observable goes from
0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
When the second subscriber is added, no additional subscriptions are added to the
underlying observable sequence. As a result the operations that result in side
effects are not repeated per subscriber.

'''
subs(publ, name='SourceA')
subs(publ, name='SourceB')



========== share ==========

module rx.linq.observable.publish
@extensionmethod(ObservableBase)
def share(self):
    Share a single subscription among multple observers.

    Returns a new Observable that multicasts (shares) the original
    Observable. As long as there is at least one Subscriber this
    Observable will be subscribed and emitting data. When all
    subscribers have unsubscribed it will unsubscribe from the source
    Observable.

    This is an alias for Observable.publish().ref_count().
--------------------------------------------------------------------------------

   1.3     M New subscription (SourceA) on stream 276514157

   2.8     M New subscription (SourceB) on stream 276514157
Out[41]:
<rx.disposable.Disposable.Disposable at 0x107b455d0>
 206.9  T190 sideeffect 0
 207.4  T190 [next]  205.9: 0 -> SourceA
 207.5  T190 [next]  204.5: 0 -> SourceB
 410.9  T191 sideeffect 1
 411.2  T191 [next]  409.7: 1 -> SourceA
 411.3  T191 [next]  408.2: 1 -> SourceB
 411.4  T191 [cmpl]  409.8: fin -> SourceA
 411.6  T191 [cmpl]  408.5: fin -> SourceB

... and then I want to ask it to start connect

You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.

Call a ConnectableObservable’s connect method to instruct it to begin emitting the items from its underlying Observable to its Subscribers.

The connect method returns a Disposable. You can call that Disposable object’s dispose method to instruct the Observable to stop emitting items to its Subscribers.

You can also use the connect method to instruct an Observable to begin emitting items (or, to begin generating items that would be emitted) even before any Subscriber has subscribed to it.

In this way you can turn a cold Observable into a hot one.


In [62]:
rst(O.interval(1).publish().connect)
published = O.create(emit).publish()

def emit(obs):
    for i in range(0, 10):
        log('emitting', i, obs.__class__.__name__, hash(obs))
        # going nowhere
        obs.on_next(i)
        sleep(0.1)

import thread
thread.start_new_thread(published.connect, ())
sleep(0.5)
d = subs(published, scheduler=new_thread_scheduler)



========== connect ==========

module rx.linq.connectableobservable
def connect(self):
    Connects the observable.
--------------------------------------------------------------------------------

 507.1     M New subscription (14637) on stream 276493689
 517.2  T193 [next]    9.9: 5 -> 14637
 622.8  T193 [next]  115.6: 6 -> 14637
 724.9  T193 [next]  217.7: 7 -> 14637
 826.5  T193 [next]  319.3: 8 -> 14637
 931.7  T193 [next]  424.5: 9 -> 14637

In [ ]: